Amazon AthenaのMERGE文でWHEN NOT MATCHED BY SOURCEを再現する
データ事業本部インテグレーション部機械学習チーム・新納(にいの)です。
Amazon AthenaではIcebergフォーマットでテーブルを作成した場合、条件に応じてデータの挿入(INSERT)、更新(UPDATE)、削除(DELETE)を1つの文で行うMERGE文が利用可能です。
ただし、AthenaでサポートされているMERGE文のWHEN句はWHEN MATCHED
もしくは WHEN NOT MATCHED
のみです。BigQueryでサポートされているようなWHEN NOT MATCHED BY SOURCE
(ターゲットテーブルにしかキーが合致するレコードが存在しない場合)はサポートされていません。
AthenaでもMERGE文の書き方を工夫するとこのWHEN NOT MATCHED BY SOURCE
を再現できます。
MERGEの条件
このブログではMERGE文による更新対象のテーブルをターゲットテーブル、結合元のテーブルをソーステーブルと表現します。
Amazon AthenaでサポートされているMERGE文のWHEN句の条件は以下の通りです。
WHEN句 | 説明 |
---|---|
WHEN MATCHED THEN DELETE |
ソーステーブルとターゲットテーブルのキーが一致する場合、レコードを削除 |
WHEN MATCHED THEN UPDATE SET ( column = expression [, ...] ) |
ソーステーブルとターゲットテーブルのキーが一致する場合、レコードを更新する |
WHEN NOT MATCHED THEN INSERT (column_name[, column_name ...]) VALUES (expression, ...) |
ソーステーブルに存在するが、ターゲットテーブルに存在しない行の場合、ターゲットテーブルへインサート |
全て、BigQueryでいうところのWHEN NOT MATCHED BY TARGET
に相当する動作です。ターゲットテーブルに存在するが、ソーステーブルに存在しない行の場合のWHEN NOT MATCHED BY SOURCE
はサポートされていません。
AthenaでWHEN NOT MATCHED BY SOURCEを再現する
以下の記事を参考にMERGE文を作成しています。
Athenaで再現するべく、2種類のテーブルを作成しました。更新対象となるターゲットテーブルは以下の通り。ターゲットテーブルにのみid=3のレコードがあります。
結合元のソーステーブルは以下の通り。
今回はWHEN NOT MATCHED BY SOURCE THEN DELETE
(ターゲットテーブルに存在するが、ソーステーブルに存在しない行を削除する)を再現します。
USING句の中でサブクエリを使用し、以下の条件を記述します。
- ターゲット・ソーステーブルをMERGEキーでLEFT JOIN
- マージ条件に一致したレコード(ソーステーブルに存在しないターゲットテーブルのレコード)を削除
MERGE INTO target_iceberg_table USING (
SELECT target_iceberg_table.id
FROM target_iceberg_table
LEFT JOIN source_iceberg_table ON target_iceberg_table.id = source_iceberg_table.id
WHERE source_iceberg_table.id is null
) s ON s.id = target_iceberg_table.id
WHEN matched THEN DELETE;
実行すると、更新対象とするターゲットテーブルはソーステーブルの中身と同一になり、ソーステーブルに存在しないがターゲットテーブルに存在する行は削除されていることがわかりました。
一つのMERGE文で更新・挿入・削除を実現する
USING句でサブクエリを使う方法でWHEN NOT MATCHED BY SOURCE THEN DELETE
を実現できましたが、このままでは一つのMERGE文で削除しかできません。一度に更新・挿入・削除を実現してみましょう。
今回のターゲットテーブルは以下の通り。
ソーステーブルは以下の通り。
このテーブルに対して、以下の処理を実現します。
- マージキーにマッチしたレコード→UPDATE
- 上記テーブルのid=2
- ソーステーブルのみにの存在するレコード→INSERT
- ソーステーブルのid=4
- ターゲットテーブルのみに存在するレコード→DELETE
- ターゲットテーブルのid=3
なんだかややこしいですが、図にするとこんな感じです。
このテーブルに対して以下のMERGE文を実行します。
ポイントは以下の通り。
- USING句内のサブクエリでソーステーブルとターゲットテーブルを結合
- 各レコードに対して'UPDATE'、'INSERT'、'DELETE'のアクションを決定し、WHEN MATCHEDの条件として指定する方法があります。
MERGE INTO target_iceberg_table
USING (
SELECT s.*,
CASE
WHEN t.id IS NULL THEN 'INSERT' ELSE 'UPDATE'
END AS merge_action
FROM source_iceberg_table s
LEFT JOIN target_iceberg_table t ON s.id = t.id
UNION ALL
SELECT t.*,
'DELETE' AS merge_action
FROM target_iceberg_table t
LEFT JOIN source_iceberg_table s ON t.id = s.id
WHERE s.id IS NULL
) merged_data
ON target_iceberg_table.id = merged_data.id
WHEN MATCHED
AND merged_data.merge_action = 'UPDATE'
THEN UPDATE SET
name = merged_data.name,
record_time = merged_data.record_time
WHEN MATCHED
AND merged_data.merge_action = 'DELETE'
THEN DELETE
WHEN NOT MATCHED THEN
INSERT (id, name, record_time)
VALUES (
merged_data.id,
merged_data.name,
merged_data.record_time
)
実行後、期待通りソーステーブルとターゲットテーブルのレコードが同一となりました。
最後に
Amazon AthenaでWHEN NOT MATCHED BY SOURCE
に相当する動作を再現してみました。USING句でソーステーブルとターゲットテーブルをJOINしている関係上、データ量が多いとパフォーマンスに影響が出る恐れがあるのでご留意ください。